package com.yk.dataGatherer.controller;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.yk.api.dataGatherer.dto.EmqMessage;
import com.yk.api.system.dto.GatewayDTO;
import com.yk.common.core.constant.CacheConstants;
import com.yk.common.core.constant.MqttConstants;
import com.yk.common.core.dto.GateWayUpdateDTO;
import com.yk.common.core.utils.JsonUtils;
import com.yk.common.rabbitmq.constant.QueueConstants;
import com.yk.common.redis.service.RedisService;
import com.yk.dataGatherer.service.EmqWebHookService;
import com.yk.dataGatherer.websocket.DebuggerWebSocketServer;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Objects;


/**
 * emq/webhook
 *
 * @author lmx
 * @date 2023/10/16 18:15
 */
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/emq")
@Api(tags = "emqWebhook")
public class EmqWebHookController {

    private final EmqWebHookService emqWebHookService;
    private final RedisService redisService;
    private final AmqpTemplate amqpTemplate;

    /**
     * 处理emq/webhook消息
     *
     * @param body emq/webhook消息
     */
    @PostMapping("/webhookMessage")
    @ApiOperation(value = "处理emq/webhook消息")
    public void handleWebhookMessage(@RequestBody String body) {
        try {
            EmqMessage emqMessage = JSONUtil.toBean(body, EmqMessage.class);
            String topic = emqMessage.getTopic();
            String[] split = topic.split("/");
            // 操作类型 DATA/CONFIG_ACK/WRITE_ACK
            String mqttStatus = split[split.length - 1];
            // 查询网关
            String substring = topic.substring(0, topic.lastIndexOf("/"));
            substring += "/data";
            GatewayDTO gateway = redisService.getCacheObject(CacheConstants.GATEWAY_KEY + substring);
            if (Objects.isNull(gateway) || Objects.isNull(gateway.getId())) {
                return;
            }
            // 调试信息
            DebuggerWebSocketServer.sendMessage(gateway.getId().toString(), emqMessage.getPayload());
            JSONObject data = JsonUtils.parseObject(emqMessage.getPayload(), new TypeReference<JSONObject>() {});
            emqWebHookService.fetchData(mqttStatus, data, gateway, emqMessage.getClientid());
        }catch (Exception e){
            log.error("emq/webhook消息处理失败，{}", e.getMessage());
        }
    }

    /**
     * 处理emq/webhook离线消息
     *
     * @param body emq/webhook离线消息
     */
    @PostMapping("/offlineMessage")
    @ApiOperation(value = "处理emq/webhook消息")
    public void offline(@RequestBody String body) {
        try {
            JSONObject data = JSONUtil.parseObj(body);
            String clientId = data.getStr(MqttConstants.CLIENT_ID);
            // 网关离线状态更新
            amqpTemplate.convertAndSend(QueueConstants.GATEWAY_OFFLINE_UPDATE, new GateWayUpdateDTO(clientId));
        }catch (Exception e) {
            log.error("emq/webhook离线消息处理失败", e);
        }
    }

}
